package com.yance.fim.websocket.handler;


import cn.hutool.core.thread.ExecutorBuilder;
import com.yance.fim.utils.CommandStateConstants;
import com.yance.fim.utils.Constants;
import com.yance.fim.utils.Pair;
import com.yance.fim.utils.Utils;
import com.yance.fim.websocket.packet.DataPacket;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


/**
 * 消息分发器
 */
public class MessageDispatcher {

    private static final ExecutorService executorService = Executors.newSingleThreadExecutor();

    private static Map<Integer, Pair<DefaultRequestHandler, ExecutorService>> handlersMap = new HashMap<>();

    private static MessageDispatcher messageDispatcher = null;

    public MessageDispatcher() {
        registerMessageHandler();
    }

    public static MessageDispatcher getInstance() {
        if (messageDispatcher == null) {
            synchronized (MessageDispatcher.class) {
                if (messageDispatcher == null) {
                    messageDispatcher = new MessageDispatcher();
                }
            }
        }
        return messageDispatcher;
    }

    /**
     * 分发器
     * <p>指定一个线程处理消息分发器的中转，然后再由各个消息处理器进行处理</p>
     */
    public void dispatcher(DataPacket dataPacket) {
        executorService.execute(() -> {
            try {
                int command = dataPacket.getCommand();
                Pair<DefaultRequestHandler, ExecutorService> pair = handlersMap.get(command);
                if (pair != null) {
                    pair.getRight().execute(() -> pair.getLeft().processRequest(dataPacket));
                } else {
                    ImExecptionHandler.ImExecptionClose(dataPacket.getChannelContext(), CommandStateConstants.MESSAGE_PROTOCOL_ERROR);
                }
            } catch (Exception e) {
                Utils.ConsoleExecptionLog(e);
                ImExecptionHandler.ImExecptionClose(dataPacket.getChannelContext(), CommandStateConstants.MESSAGE_DISPATCHER_EXCEPTION);
            }
        });
    }

    /**
     * 集成器
     * <p>给每个处理器创建线程池</p>
     */
    private void registerMessageHandler() {
        try {
            for (Class cls : Utils.getClassPathAllClass(MessageDispatcher.class)) {
                Command annotation = (Command) cls.getAnnotation(Command.class);
                if (annotation != null) {
                    DefaultRequestHandler handler = (DefaultRequestHandler) cls.newInstance();
                    handlersMap.put(annotation.value(), new Pair<>(handler, ExecutorBuilder.create()
                            .setCorePoolSize(Constants.SUB_PROCESSOR_CORE_POOL_SIZE)
                            .setMaxPoolSize(Constants.SUB_PROCESSOR_MAX_POOL_SIZE)
                            .useSynchronousQueue()
                            .build()));
                }
            }
        } catch (Exception e) {
            Utils.ConsoleExecptionLog(e);
        }
    }
}
